In [1]:
# %%bash

# pip install tensorflow==1.7
# pip install google-cloud-dataflow==2.3
# pip install tensorflow-hub

Text Classification using TensorFlow and Google Cloud - Part 1

This bigquery-public-data:hacker_news contains all stories and comments from Hacker News from its launch in 2006. Each story contains a story id, url, the title of the story, tthe author that made the post, when it was written, and the number of points the story received.

The objective is, given the title of the story, we want to build an ML model that can predict the source of this story.

Data preparation with tf.Transform and DataFlow

This notebook illustrates how to build a Beam pipeline using tf.transform to prepare ML 'train' and 'eval' datasets. The pipeline includes the following steps:

  1. Read data from BigQuery
  2. Extract and clean features from BQ rows
  3. Use tf.transfrom to process the text and produce the following features for each entry
    • title: Raw text - string
    • bow: Bag of word indecies - sparse vector of integers
    • weight: TF.IDF values - sparse vector of floats
    • source: target feature - string
  4. Save the data as .tfrecord files

Setting Global Parameters


In [2]:
import os

class Params:
    pass

# Set to run on GCP
Params.GCP_PROJECT_ID = 'ksalama-gcp-playground'
Params.REGION = 'europe-west1'
Params.BUCKET = 'ksalama-gcs-cloudml'

Params.PLATFORM = 'local' # local | GCP

Params.DATA_DIR = 'data/news'  if Params.PLATFORM == 'local' else 'gs://{}/data/news'.format(Params.BUCKET)

Params.TRANSFORMED_DATA_DIR = os.path.join(Params.DATA_DIR, 'transformed')
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'train')
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'eval')

Params.TEMP_DIR = os.path.join(Params.DATA_DIR, 'tmp')

Params.MODELS_DIR = 'models/news' if Params.PLATFORM == 'local' else 'gs://{}/models/news'.format(Params.BUCKET)

Params.TRANSFORM_ARTEFACTS_DIR = os.path.join(Params.MODELS_DIR,'transform')

Params.TRANSFORM = True

Importing libraries


In [3]:
import apache_beam as beam

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.coders as tft_coders

from tensorflow.contrib.learn.python.learn.utils import input_fn_utils

from tensorflow_transform.beam import impl
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.saved import saved_transform_io


WARNING:tensorflow:From /Users/khalidsalama/Technology/python-venvs/py27-venv/lib/python2.7/site-packages/tensorflow/contrib/learn/python/learn/datasets/base.py:198: retry (from tensorflow.contrib.learn.python.learn.datasets.base) is deprecated and will be removed in a future version.
Instructions for updating:
Use the retry module or similar alternatives.

1. Source Query


In [4]:
bq_query = '''
SELECT
    key,
    REGEXP_REPLACE(title, '[^a-zA-Z0-9 $.-]', ' ') AS title, 
    source
FROM
(
    SELECT
        ARRAY_REVERSE(SPLIT(REGEXP_EXTRACT(url, '.*://(.[^/]+)/'), '.'))[OFFSET(1)] AS source,
        title,
        ABS(FARM_FINGERPRINT(title)) AS Key
    FROM
      `bigquery-public-data.hacker_news.stories`
    WHERE
      REGEXP_CONTAINS(REGEXP_EXTRACT(url, '.*://(.[^/]+)/'), '.com$')
      AND LENGTH(title) > 10
)
WHERE (source = 'github' OR source = 'nytimes' OR source = 'techcrunch')
'''

def get_source_query(step):
    
    if step == 'train':
        source_query = 'SELECT * FROM ({}) WHERE MOD(key,100) <= 75'.format(bq_query)
    else:
        source_query = 'SELECT * FROM ({}) WHERE MOD(key,100) > 75'.format(bq_query)
        
    return source_query

2. Raw metadata


In [5]:
RAW_HEADER = 'key,title,source'.split(',')
RAW_DEFAULTS = [['NA'],['NA'],['NA']]
TARGET_FEATURE_NAME = 'source'
TARGET_LABELS = ['github', 'nytimes', 'techcrunch']
TEXT_FEATURE_NAME = 'title'
KEY_COLUMN = 'key'

VOCAB_SIZE = 20000
TRAIN_SIZE = 73124
EVAL_SIZE = 23079

DELIMITERS = '.,!?() '

raw_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
    KEY_COLUMN: dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation()),
    TEXT_FEATURE_NAME: dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation()),
    TARGET_FEATURE_NAME: dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation()),
}))

3. Preprocessing functions


In [6]:
def get_features(bq_row):
    
    CSV_HEADER = 'key,title,source'.split(',')
    
    input_features = {}
    
    for feature_name in CSV_HEADER:
        input_features[feature_name] = str(bq_row[feature_name]).lower()
        
    return input_features


def preprocessing_fn(input_features):
 
    text = input_features[TEXT_FEATURE_NAME]

    text_tokens = tf.string_split(text, DELIMITERS)
    text_tokens_indcies = tft.string_to_int(text_tokens, top_k=VOCAB_SIZE)
    bag_of_words_indices, text_weight = tft.tfidf(text_tokens_indcies, VOCAB_SIZE + 1)
    
    output_features = {}
    output_features[TEXT_FEATURE_NAME] = input_features[TEXT_FEATURE_NAME]
    output_features['bow'] = bag_of_words_indices
    output_features['weight'] = text_weight
    output_features[TARGET_FEATURE_NAME] = input_features[TARGET_FEATURE_NAME]
    
    return output_features

4. Beam Pipeline


In [7]:
import apache_beam as beam


def run_pipeline(runner, opts):
    
    print("Sink train data files: {}".format(Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX))
    print("Sink data files: {}".format(Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX))
    print("Temporary directory: {}".format(Params.TEMP_DIR))
    print("")
    
    
    with beam.Pipeline(runner, options=opts) as pipeline:
        with impl.Context(Params.TEMP_DIR): 
    
            ###### analyze & transform train #########################################################
            if(runner=='DirectRunner'):
                print("")
                print("Transform training data....")
                print("")
            
            step = 'train'
            source_query = get_source_query(step)
            
            # Read raw train data from BQ and cleanup
            raw_train_data = (
              pipeline
              | '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
              | '{} - Extract Features'.format(step) >> beam.Map(get_features)
            )
            
            # create a train dataset from the data and schema
            raw_train_dataset = (raw_train_data, raw_metadata)
            
            # analyze and transform raw_train_dataset to produced transformed_train_dataset and transform_fn
            transformed_train_dataset, transform_fn = (
                raw_train_dataset 
                | '{} - Analyze & Transform'.format(step) >> impl.AnalyzeAndTransformDataset(preprocessing_fn)
            )
            
            # get data and schema separately from the transformed_train_dataset
            transformed_train_data, transformed_metadata = transformed_train_dataset

            # write transformed train data to sink
            _ = (
                transformed_train_data 
                | '{} - Write Transformed Data as tfrecords'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
                    file_path_prefix=Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX,
                    file_name_suffix=".tfrecords",
                    num_shards=25,
                    coder=tft_coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
            )
            
            
#             #### TEST write transformed AS TEXT train data to sink
#             _ = (
#                 transformed_train_data 
#                 | '{} - Write Transformed Data as Text'.format(step) >> beam.io.textio.WriteToText(
#                     file_path_prefix=Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX,
#                     file_name_suffix=".csv")
#             )
#             ##################################################


            ###### transform eval ##################################################################
            
            if(runner=='DirectRunner'):
                print("")
                print("Transform eval data....")
                print("")
            
            step = 'eval'
            source_query = get_source_query(step)

            # Read raw eval data from BQ and cleanup
            raw_eval_data = (
              pipeline
              | '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
              | '{} - Extract Features'.format(step) >> beam.Map(get_features)
            )
            
            # create a eval dataset from the data and schema
            raw_eval_dataset = (raw_eval_data, raw_metadata)
            
            # transform eval data based on produced transform_fn (from analyzing train_data)
            transformed_eval_dataset = (
                (raw_eval_dataset, transform_fn) 
                | '{} - Transform'.format(step) >> impl.TransformDataset()
            )
            
            # get data from the transformed_eval_dataset
            transformed_eval_data, _ = transformed_eval_dataset
            
            # write transformed eval data to sink
            _ = (
                transformed_eval_data 
                | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
                    file_path_prefix=Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX,
                    file_name_suffix=".tfrecords",
                    num_shards=10,
                    coder=tft_coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
            )
        
            ###### write transformation metadata #######################################################
            if(runner=='DirectRunner'):
                print("")
                print("Saving transformation artefacts ....")
                print("")
            
            # write transform_fn as tf.graph
            _ = (
                transform_fn 
                | 'Write Transform Artefacts' >> transform_fn_io.WriteTransformFn(Params.TRANSFORM_ARTEFACTS_DIR)
            )

    if runner=='DataflowRunner':
        pipeline.run()

5. Run Pipeline


In [8]:
from datetime import datetime
import shutil

job_name = 'preprocess-hackernews-data' + '-' + datetime.utcnow().strftime('%y%m%d-%H%M%S')

options = {
    'region': Params.REGION,
    'staging_location': os.path.join(Params.TEMP_DIR, 'staging'),
    'temp_location': Params.TEMP_DIR,
    'job_name': job_name,
    'project': Params.GCP_PROJECT_ID
}

tf.logging.set_verbosity(tf.logging.ERROR)

opts = beam.pipeline.PipelineOptions(flags=[], **options)
runner = 'DirectRunner' if Params.PLATFORM == 'local' else 'DirectRunner'

if Params.TRANSFORM:
    
    if Params.PLATFORM == 'local':
        shutil.rmtree(Params.TRANSFORMED_DATA_DIR, ignore_errors=True)
        shutil.rmtree(Params.TRANSFORM_ARTEFACTS_DIR, ignore_errors=True)
        shutil.rmtree(Params.TEMP_DIR, ignore_errors=True)
    
    print 'Launching {} job {} ... hang on'.format(runner, job_name)
    
    run_pipeline(runner, opts)
    
    print "Pipline completed."
else:
    print "Transformation skipped!"


Launching DirectRunner job preprocess-hackernews-data-180514-115222 ... hang on
Sink train data files: data/news/transformed/train
Sink data files: data/news/transformed/eval
Temporary directory: data/news/tmp


Transform training data....


Transform eval data....


Saving transformation artefacts ....

/Users/khalidsalama/Technology/python-venvs/py27-venv/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:337: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will not be supported
  pipeline.replace_all(_get_transform_overrides(pipeline.options))
WARNING:root:Dataset ksalama-gcp-playground:temp_dataset_151e64fa07a3490bae91dd844ce4b7da does not exist so we will create it as temporary with location=None
WARNING:root:Dataset ksalama-gcp-playground:temp_dataset_f3701d6e27e14e068968a255f43c4b8c does not exist so we will create it as temporary with location=None
Pipline completed.

In [9]:
%%bash

echo "** transformed data:"
ls data/news/transformed
echo ""

echo "** transform artefacts:"
ls models/news/transform
echo ""

echo "** transform assets:"
ls models/news/transform/transform_fn/assets
echo ""

head models/news/transform/transform_fn/assets/vocab_string_to_int_uniques


** transformed data:
eval-00000-of-00010.tfrecords
eval-00001-of-00010.tfrecords
eval-00002-of-00010.tfrecords
eval-00003-of-00010.tfrecords
eval-00004-of-00010.tfrecords
eval-00005-of-00010.tfrecords
eval-00006-of-00010.tfrecords
eval-00007-of-00010.tfrecords
eval-00008-of-00010.tfrecords
eval-00009-of-00010.tfrecords
train-00000-of-00025.tfrecords
train-00001-of-00025.tfrecords
train-00002-of-00025.tfrecords
train-00003-of-00025.tfrecords
train-00004-of-00025.tfrecords
train-00005-of-00025.tfrecords
train-00006-of-00025.tfrecords
train-00007-of-00025.tfrecords
train-00008-of-00025.tfrecords
train-00009-of-00025.tfrecords
train-00010-of-00025.tfrecords
train-00011-of-00025.tfrecords
train-00012-of-00025.tfrecords
train-00013-of-00025.tfrecords
train-00014-of-00025.tfrecords
train-00015-of-00025.tfrecords
train-00016-of-00025.tfrecords
train-00017-of-00025.tfrecords
train-00018-of-00025.tfrecords
train-00019-of-00025.tfrecords
train-00020-of-00025.tfrecords
train-00021-of-00025.tfrecords
train-00022-of-00025.tfrecords
train-00023-of-00025.tfrecords
train-00024-of-00025.tfrecords

** transform artefacts:
transform_fn
transformed_metadata

** transform assets:
vocab_string_to_int_uniques

the
a
to
for
in
of
and
s
on
with

In [ ]: